/**
 * Copyright 2025 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { Anthropic } from '@anthropic-ai/sdk';
import type { DocumentBlockParam } from '@anthropic-ai/sdk/resources/messages';
import type {
  GenerateRequest,
  GenerateResponseChunkData,
  GenerateResponseData,
  MessageData,
  Part,
  Role,
} from 'genkit';
import { Message as GenkitMessage } from 'genkit';
import type { ToolDefinition } from 'genkit/model';

import {
  AnthropicConfigSchema,
  Media,
  MediaSchema,
  MediaType,
  MediaTypeSchema,
  type ClaudeRunnerParams,
  type ThinkingConfig,
} from '../types.js';

import {
  RunnerContentBlockParam,
  RunnerMessage,
  RunnerMessageParam,
  RunnerRequestBody,
  RunnerStream,
  RunnerStreamEvent,
  RunnerStreamingRequestBody,
  RunnerTool,
  RunnerToolResponseContent,
  RunnerTypes,
} from './types.js';

const ANTHROPIC_THINKING_CUSTOM_KEY = 'anthropicThinking';

/**
 * Shared runner logic for Anthropic SDK integrations.
 *
 * Concrete subclasses pass in their SDK-specific type bundle via `RunnerTypes`,
 * letting this base class handle message/tool translation once for both the
 * stable and beta APIs that share the same conceptual surface.
 */
export abstract class BaseRunner<ApiTypes extends RunnerTypes> {
  protected name: string;
  protected client: Anthropic;
  protected cacheSystemPrompt?: boolean;

  /**
   * Default maximum output tokens for Claude models when not specified in the request.
   */
  protected readonly DEFAULT_MAX_OUTPUT_TOKENS = 4096;

  constructor(params: ClaudeRunnerParams) {
    this.name = params.name;
    this.client = params.client;
    this.cacheSystemPrompt = params.cacheSystemPrompt;
  }

  /**
   * Converts a Genkit role to the corresponding Anthropic role.
   */
  protected toAnthropicRole(
    role: Role,
    toolMessageType?: 'tool_use' | 'tool_result'
  ): 'user' | 'assistant' {
    if (role === 'user') {
      return 'user';
    }
    if (role === 'model') {
      return 'assistant';
    }
    if (role === 'tool') {
      return toolMessageType === 'tool_use' ? 'assistant' : 'user';
    }
    throw new Error(`Unsupported genkit role: ${role}`);
  }

  protected isMediaType(value: string): value is MediaType {
    return MediaTypeSchema.safeParse(value).success;
  }

  protected isMediaObject(obj: unknown): obj is Media {
    return MediaSchema.safeParse(obj).success;
  }

  /**
   * Checks if a URL is a data URL (starts with 'data:').
   */
  protected isDataUrl(url: string): boolean {
    return url.startsWith('data:');
  }

  protected extractDataFromBase64Url(
    url: string
  ): { data: string; contentType: string } | null {
    const match = url.match(/^data:([^;]+);base64,(.+)$/);
    return (
      match && {
        contentType: match[1],
        data: match[2],
      }
    );
  }

  /**
   * Both the stable and beta Anthropic SDKs accept the same JSON shape for PDF
   * document sources (either `type: 'base64'` with a base64 payload or `type: 'url'`
   * with a public URL). Even though the return type references the stable SDK
   * union, TypeScript’s structural typing lets the beta runner reuse this helper.
   */
  protected toPdfDocumentSource(media: Media): DocumentBlockParam['source'] {
    if (media.contentType !== 'application/pdf') {
      throw new Error(
        `PDF contentType mismatch: expected application/pdf, got ${media.contentType}`
      );
    }
    const url = media.url;
    if (this.isDataUrl(url)) {
      const extracted = this.extractDataFromBase64Url(url);
      if (!extracted) {
        throw new Error(
          `Invalid PDF data URL format: ${url.substring(0, 50)}...`
        );
      }
      const { data, contentType } = extracted;
      if (contentType !== 'application/pdf') {
        throw new Error(
          `PDF contentType mismatch: expected application/pdf, got ${contentType}`
        );
      }
      return {
        type: 'base64',
        media_type: 'application/pdf',
        data,
      };
    }
    return {
      type: 'url',
      url,
    };
  }

  /**
   * Normalizes Genkit `Media` into either a base64 payload or a remote URL
   * accepted by the Anthropic SDK. Anthropic supports both `data:` URLs (which
   * we forward as base64) and remote `https` URLs without additional handling.
   */
  protected toImageSource(
    media: Media
  ):
    | { kind: 'base64'; data: string; mediaType: MediaType }
    | { kind: 'url'; url: string } {
    if (this.isDataUrl(media.url)) {
      const extracted = this.extractDataFromBase64Url(media.url);
      const { data, contentType } = extracted ?? {};
      if (!data || !contentType) {
        throw new Error(
          `Invalid genkit part media provided to toAnthropicMessageContent: ${JSON.stringify(
            media
          )}.`
        );
      }

      const resolvedMediaType = contentType;
      if (!resolvedMediaType) {
        throw new Error('Media type is required but was not provided');
      }
      if (!this.isMediaType(resolvedMediaType)) {
        // Provide helpful error message for text files
        if (resolvedMediaType === 'text/plain') {
          throw new Error(
            `Unsupported media type: ${resolvedMediaType}. Text files should be sent as text content in the message, not as media. For example, use { text: '...' } instead of { media: { url: '...', contentType: 'text/plain' } }`
          );
        }
        throw new Error(`Unsupported media type: ${resolvedMediaType}`);
      }
      return {
        kind: 'base64',
        data,
        mediaType: resolvedMediaType,
      };
    }

    if (!media.url) {
      throw new Error('Media url is required but was not provided');
    }

    // For non-data URLs, use the provided contentType or default to a generic type
    // Note: Anthropic will validate the actual content when fetching from URL
    if (media.contentType) {
      if (!this.isMediaType(media.contentType)) {
        // Provide helpful error message for text files
        if (media.contentType === 'text/plain') {
          throw new Error(
            `Unsupported media type: ${media.contentType}. Text files should be sent as text content in the message, not as media. For example, use { text: '...' } instead of { media: { url: '...', contentType: 'text/plain' } }`
          );
        }
        throw new Error(`Unsupported media type: ${media.contentType}`);
      }
    }

    return {
      kind: 'url',
      url: media.url,
    };
  }

  /**
   * Converts tool response output to the appropriate Anthropic content format.
   * Handles Media objects, data URLs, strings, and other outputs.
   */
  protected toAnthropicToolResponseContent(
    part: Part
  ): RunnerToolResponseContent<ApiTypes> {
    const output = part.toolResponse?.output ?? {};

    // Handle Media objects (images returned by tools)
    if (this.isMediaObject(output)) {
      const { data, contentType } =
        this.extractDataFromBase64Url(output.url) ?? {};
      if (data && contentType) {
        if (!this.isMediaType(contentType)) {
          // Provide helpful error message for text files
          if (contentType === 'text/plain') {
            throw new Error(
              `Unsupported media type: ${contentType}. Text files should be sent as text content, not as media.`
            );
          }
          throw new Error(`Unsupported media type: ${contentType}`);
        }
        return {
          type: 'image',
          source: {
            type: 'base64',
            data,
            media_type: contentType,
          },
        };
      }
    }

    // Handle string outputs - check if it's a data URL
    if (typeof output === 'string') {
      // Check if string is a data URL (e.g., "data:image/gif;base64,...")
      if (this.isDataUrl(output)) {
        const { data, contentType } =
          this.extractDataFromBase64Url(output) ?? {};
        if (data && contentType) {
          if (!this.isMediaType(contentType)) {
            // Provide helpful error message for text files
            if (contentType === 'text/plain') {
              throw new Error(
                `Unsupported media type: ${contentType}. Text files should be sent as text content, not as media.`
              );
            }
            throw new Error(`Unsupported media type: ${contentType}`);
          }
          return {
            type: 'image',
            source: {
              type: 'base64',
              data,
              media_type: contentType,
            },
          };
        }
      }
      // Regular string output
      return {
        type: 'text',
        text: output,
      };
    }

    // Handle other outputs by stringifying
    return {
      type: 'text',
      text: JSON.stringify(output),
    };
  }

  protected createThinkingPart(thinking: string, signature?: string): Part {
    const custom =
      signature !== undefined
        ? {
            [ANTHROPIC_THINKING_CUSTOM_KEY]: { signature },
          }
        : undefined;
    return custom
      ? {
          reasoning: thinking,
          custom,
        }
      : {
          reasoning: thinking,
        };
  }

  protected getThinkingSignature(part: Part): string | undefined {
    const custom = part.custom as Record<string, unknown> | undefined;
    const thinkingValue = custom?.[ANTHROPIC_THINKING_CUSTOM_KEY];
    if (
      typeof thinkingValue === 'object' &&
      thinkingValue !== null &&
      'signature' in thinkingValue &&
      typeof (thinkingValue as { signature: unknown }).signature === 'string'
    ) {
      return (thinkingValue as { signature: string }).signature;
    }
    return undefined;
  }

  protected getRedactedThinkingData(part: Part): string | undefined {
    const custom = part.custom as Record<string, unknown> | undefined;
    const redacted = custom?.redactedThinking;
    return typeof redacted === 'string' ? redacted : undefined;
  }

  protected toAnthropicThinkingConfig(
    config: ThinkingConfig | undefined
  ):
    | { type: 'enabled'; budget_tokens: number }
    | { type: 'disabled' }
    | undefined {
    if (!config) return undefined;

    const { enabled, budgetTokens } = config;

    if (enabled === true) {
      if (budgetTokens === undefined) {
        return undefined;
      }
      return { type: 'enabled', budget_tokens: budgetTokens };
    }

    if (enabled === false) {
      return { type: 'disabled' };
    }

    if (budgetTokens !== undefined) {
      return { type: 'enabled', budget_tokens: budgetTokens };
    }

    return undefined;
  }

  protected toWebSearchToolResultPart(params: {
    toolUseId: string;
    content: unknown;
    type: string;
  }): Part {
    const { toolUseId, content, type } = params;
    return {
      text: `[Anthropic server tool result ${toolUseId}] ${JSON.stringify(content)}`,
      custom: {
        anthropicServerToolResult: {
          type,
          toolUseId,
          content,
        },
      },
    };
  }

  /**
   * Converts a Genkit Part to the corresponding Anthropic content block.
   * Each runner implements this to return its specific API type.
   */
  protected abstract toAnthropicMessageContent(
    part: Part
  ): RunnerContentBlockParam<ApiTypes>;

  /**
   * Converts Genkit messages to Anthropic format.
   * Extracts system message and converts remaining messages using the runner's
   * toAnthropicMessageContent implementation.
   */
  protected toAnthropicMessages(messages: MessageData[]): {
    system?: string;
    messages: RunnerMessageParam<ApiTypes>[];
  } {
    let system: string | undefined;

    if (messages[0]?.role === 'system') {
      const systemMessage = messages[0];
      const textParts: string[] = [];

      for (const part of systemMessage.content ?? []) {
        if (part.text) {
          textParts.push(part.text);
        } else if (part.media || part.toolRequest || part.toolResponse) {
          throw new Error(
            'System messages can only contain text content. Media, tool requests, and tool responses are not supported in system messages.'
          );
        }
      }

      // Concatenate multiple text parts into a single string.
      // Note: The Anthropic SDK supports system as string | Array<TextBlockParam>,
      // so we could alternatively preserve the multi-part structure as:
      //   system = textParts.map(text => ({ type: 'text', text }))
      // However, concatenation is simpler and maintains semantic equivalence while
      // keeping the cache control logic straightforward in the concrete runners.
      system = textParts.length > 0 ? textParts.join('\n\n') : undefined;
    }

    const messagesToIterate =
      system !== undefined ? messages.slice(1) : messages;
    const anthropicMsgs: RunnerMessageParam<ApiTypes>[] = [];

    for (const message of messagesToIterate) {
      const msg = new GenkitMessage(message);

      // Detect tool message kind from Genkit Parts (no SDK typing needed)
      const hadToolUse = msg.content.some((p) => !!p.toolRequest);
      const hadToolResult = msg.content.some((p) => !!p.toolResponse);

      const toolMessageType = hadToolUse
        ? ('tool_use' as const)
        : hadToolResult
          ? ('tool_result' as const)
          : undefined;

      const role = this.toAnthropicRole(message.role, toolMessageType);

      const content = msg.content.map((part) =>
        this.toAnthropicMessageContent(part)
      );

      anthropicMsgs.push({ role, content });
    }

    return { system, messages: anthropicMsgs };
  }

  /**
   * Converts a Genkit ToolDefinition to an Anthropic Tool object.
   */
  protected toAnthropicTool(tool: ToolDefinition): RunnerTool<ApiTypes> {
    return {
      name: tool.name,
      description: tool.description,
      input_schema: tool.inputSchema,
    } as RunnerTool<ApiTypes>;
  }

  /**
   * Converts an Anthropic request to a non-streaming Anthropic API request body.
   * @param modelName The name of the Anthropic model to use.
   * @param request The Genkit GenerateRequest to convert.
   * @param cacheSystemPrompt Whether to cache the system prompt.
   * @returns The converted Anthropic API non-streaming request body.
   * @throws An error if an unsupported output format is requested.
   */
  protected abstract toAnthropicRequestBody(
    modelName: string,
    request: GenerateRequest<typeof AnthropicConfigSchema>,
    cacheSystemPrompt?: boolean
  ): RunnerRequestBody<ApiTypes>;

  /**
   * Converts an Anthropic request to a streaming Anthropic API request body.
   * @param modelName The name of the Anthropic model to use.
   * @param request The Genkit GenerateRequest to convert.
   * @param cacheSystemPrompt Whether to cache the system prompt.
   * @returns The converted Anthropic API streaming request body.
   * @throws An error if an unsupported output format is requested.
   */
  protected abstract toAnthropicStreamingRequestBody(
    modelName: string,
    request: GenerateRequest<typeof AnthropicConfigSchema>,
    cacheSystemPrompt?: boolean
  ): RunnerStreamingRequestBody<ApiTypes>;

  protected abstract createMessage(
    body: RunnerRequestBody<ApiTypes>,
    abortSignal: AbortSignal
  ): Promise<RunnerMessage<ApiTypes>>;

  protected abstract streamMessages(
    body: RunnerStreamingRequestBody<ApiTypes>,
    abortSignal: AbortSignal
  ): RunnerStream<ApiTypes>;

  protected abstract toGenkitResponse(
    message: RunnerMessage<ApiTypes>
  ): GenerateResponseData;

  protected abstract toGenkitPart(
    event: RunnerStreamEvent<ApiTypes>
  ): Part | undefined;

  public async run(
    request: GenerateRequest<typeof AnthropicConfigSchema>,
    options: {
      streamingRequested: boolean;
      sendChunk: (chunk: GenerateResponseChunkData) => void;
      abortSignal: AbortSignal;
    }
  ): Promise<GenerateResponseData> {
    const { streamingRequested, sendChunk, abortSignal } = options;

    if (streamingRequested) {
      const body = this.toAnthropicStreamingRequestBody(
        this.name,
        request,
        this.cacheSystemPrompt
      );
      const stream = this.streamMessages(body, abortSignal);
      for await (const event of stream) {
        const part = this.toGenkitPart(event);
        if (part) {
          sendChunk({
            index: 0,
            content: [part],
          });
        }
      }
      const finalMessage = await stream.finalMessage();
      return this.toGenkitResponse(finalMessage);
    }

    const body = this.toAnthropicRequestBody(
      this.name,
      request,
      this.cacheSystemPrompt
    );
    const response = await this.createMessage(body, abortSignal);
    return this.toGenkitResponse(response);
  }
}
